GNBF5010 Course Project¶
Student id: 1155228903
Step 1: Implementation without dictionary¶
- Workflow
In [1]:
def main_part1():
# Example DNA sequence
seq = ('ctccaaagaaattgtagttttcttctggcttagaggtagatcatcttggtccaatcagactgaaa'
'tgccttgaggctagatttcagtctttgtGGCAGCTGgtgaatttctagtttgccttttcagctagggatt'
'agctttttaggggtcccaatgcctagggagatttctaggtcctctgttccttgctgacctccaat')
# Test the function with k=1 and k=2
for k in [1, 2, 3, 4, 5]:
counts = count_kmers_non_dict(seq, k)
write2file_non_dict(counts, f"output_k{k}.txt")
# Implement a function that counts the number of k-mers in a DNA sequence
def count_kmers_non_dict(seq, k):
# Convert the entire sequence to uppercase
seq = seq.upper()
counts = []
for i in range(len(seq) - k + 1):
kmer = seq[i:i+k]
if all(n in "ATCG" for n in kmer): # Check if kmer contains only ATCG
found = False
for j in range(len(counts)):
if counts[j][0] == kmer:
counts[j] = (counts[j][0], counts[j][1] + 1)
found = True
break
if not found:
counts.append((kmer, 1))
return counts
# Write the counts to a file
def write2file_non_dict(counts, filename):
with open(filename, 'w') as f:
for kmer, count in sorted(counts): # Sort by kmer
f.write(f"{kmer}:{count}\n")
# Run Part 1
if __name__ == "__main__":
main_part1()
print("<Implementation without dictionary, done!>")
<Implementation without dictionary, done!>
Step 2: Implementation with dictionary¶
- Workflow
In [2]:
def main_part2():
# Example DNA sequence
seq = ('ctccaaagaaattgtagttttcttctggcttagaggtagatcatcttggtccaatcagactgaaa'
'tgccttgaggctagatttcagtctttgtGGCAGCTGgtgaatttctagtttgccttttcagctagggatt'
'agctttttaggggtcccaatgcctagggagatttctaggtcctctgttccttgctgacctccaat')
# Test the function with k=1 and k=2
for k in [1, 2, 3, 4, 5]:
counts = count_kmers(seq, k)
write2file(counts, f"output_k{k}.txt")
# Implement a function that counts the number of k-mers in a DNA sequence
def count_kmers(seq, k):
# Convert the entire sequence to uppercase
seq = seq.upper()
counts = {}
for i in range(len(seq) - k + 1):
kmer = seq[i:i+k]
if all(n in "ATCG" for n in kmer): # Check if kmer contains only ATCG
counts[kmer] = counts.get(kmer, 0) + 1
return counts
# Write the counts to a file
def write2file(counts, filename):
with open(filename, 'w') as f:
for kmer, count in sorted(counts.items()): # Sort by kmer
f.write(f"{kmer}:{count}\n")
# Run Part 2
if __name__ == "__main__":
main_part2()
print("<Implementation with dictionary, done!>")
<Implementation with dictionary, done!>
- Performance test
In [3]:
import time
# Test the performance of the two implementations
def main():
# Implementation without dictionary
start_time1 = time.time()
main_part1()
end_time1 = time.time()
print("> Time for implementation without dictionary:", end_time1 - start_time1)
# Implementation with dictionary
start_time2 = time.time()
main_part2()
end_time2 = time.time()
print("> Time for implementation with dictionary:", end_time2 - start_time2)
# Determine which implementation is faster
if end_time1 - start_time1 < end_time2 - start_time2:
print("Result: the implementation without dictionary is faster.")
else:
print("Result: the implementation with dictionary is faster.")
if __name__ == "__main__":
main()
print("<Performance test, done!>")
> Time for implementation without dictionary: 0.0055446624755859375 > Time for implementation with dictionary: 0.003008604049682617 Result: the implementation with dictionary is faster. <Performance test, done!>
Step 3: Generalization¶
Function: count the occurrences of k-mers in a given biological sequence file.
Example Command: python count_kmers.py example_chromosome21.fasta 3 -c
python count_kmers.py
: The command to run the script.example_chromosome21.fasta
: The input file in FASTA or FASTQ format.3
: A single k-mer length (in this case, 3) to be counted in the input sequence file.-c
: The flag to compare the performance of the two k-mer counting implementations. When the comparison flag is used, it logs the time taken by each method to a log file named "log.txt".
Output file example (k=1):
A:41
C:41
G:47
T:71
In [ ]:
import time
import os
import argparse
from Bio import SeqIO
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Count k-mers in a sequence file")
parser.add_argument("input_file", help="Input sequence file (FASTA or FASTQ)")
parser.add_argument("kmer_lengths", help="Comma-separated list of k-mer lengths")
parser.add_argument("-c", action="store_true", help="Compare performance of k-mer counting functions")
args = parser.parse_args()
# Read sequences from input file
input_file = args.input_file
kmer_lengths = list(map(int, args.kmer_lengths.split(',')))
compare = args.c
sequences = read_sequence(input_file)
# Count k-mers for each k-mer length
for k in kmer_lengths:
# Count k-mers using the implementation without dictionary
start_time = time.time()
total_counts = {}
for seq in sequences:
counts = count_kmers(seq, k)
for kmer, count in counts.items():
total_counts[kmer] = total_counts.get(kmer, 0) + count
end_time = time.time()
elapsed_time = end_time - start_time
output_file = f"kmer_counts_k{k}.txt"
with open(output_file, "w") as f:
for kmer, count in total_counts.items():
f.write(f"{kmer}:{count}\n")
# Count k-mers using the implementation with dictionary if compare (-c) is True
if compare and len(kmer_lengths) == 1:
start_time_non_dict = time.time()
total_counts_non_dict = []
for seq in sequences:
counts_non_dict = count_kmers_non_dict(seq, k)
for kmer, count in counts_non_dict:
found = False
for i in range(len(total_counts_non_dict)):
if total_counts_non_dict[i][0] == kmer:
total_counts_non_dict[i] = (total_counts_non_dict[i][0], total_counts_non_dict[i][1] + count)
found = True
break
if not found:
total_counts_non_dict.append((kmer, count))
end_time_non_dict = time.time()
elapsed_time_non_dict = end_time_non_dict - start_time_non_dict
# Write the results to the log file
with open("log.txt", "a") as log_file:
log_file.write(f"> k={k}, dict_time={elapsed_time}, non_dict_time={elapsed_time_non_dict}\n")
# Part 1: Implementation with dictionary
def count_kmers(seq, k):
counts = {}
for i in range(len(seq) - k + 1):
kmer = seq[i:i+k]
if all(n in "ATCG" for n in kmer):
counts[kmer] = counts.get(kmer, 0) + 1
return counts
# Part 2: Implementation without dictionary
def count_kmers_non_dict(seq, k):
counts = []
for i in range(len(seq) - k + 1):
kmer = seq[i:i+k]
if all(n in "ATCG" for n in kmer):
found = False
for j in range(len(counts)):
if counts[j][0] == kmer:
counts[j] = (counts[j][0], counts[j][1] + 1)
found = True
break
if not found:
counts.append((kmer, 1))
return counts
# Read sequences from the input file (FASTA or FASTQ)
def read_sequence(file_path):
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in ['.fasta', '.fa', '.fna']:
format = 'fasta'
elif file_ext in ['.fastq', '.fq']:
format = 'fastq'
else:
raise ValueError("Unsupported file format")
sequences = []
with open(file_path, "r") as handle:
for record in SeqIO.parse(handle, format):
sequences.append(str(record.seq))
return sequences
if __name__ == "__main__":
main()
Step 4: Extra task¶
Function: Find best matches to a protein query in a protein database using a simple method based on the Jaccard index of tetrapeptide composition.
Example Command: python simple_search.py query.fasta ecoli-k12.fasta
python simple_search.py
: The command to run the script.query.fasta
: The input file containing one or more protein sequences in FASTA format.ecoli-k12.fasta
: The input file containing the protein database to be searched against in FASTA format.
Output file example:
QUERY_ID SUBJECT_ID SIMILARITY_SCORE
Query1 Subject1 0.90
Query1 Subject2 0.85
Query1 Subject3 0.80
Query1 Subject4 0.72
Query1 Subject5 0.50
Query2 Subject1 0.98
Query2 Subject2 0.50
Query2 Subject3 0.45
...
In [ ]:
import argparse
from count_kmers import count_kmers, read_sequence
from Bio import SeqIO
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Find best matches to a protein query in a protein database")
parser.add_argument("query_file", help="Protein query file (FASTA)")
parser.add_argument("database_file", help="Protein database file (FASTA)")
args = parser.parse_args()
# Read sequences and headers from the input files
query_sequences = read_sequence(args.query_file)
database_sequences = read_sequence(args.database_file)
query_headers = read_fasta_headers(args.query_file)
database_headers = read_fasta_headers(args.database_file)
# Find the best matches for each query sequence in the database
results = []
for query_id, query_seq in enumerate(query_sequences):
query_kmers = set(count_kmers(query_seq, 4).keys())
# Calculate Jaccard index for each pair of query and subject sequences
scores = []
for subject_id, subject_seq in enumerate(database_sequences):
subject_kmers = set(count_kmers(subject_seq, 4).keys())
score = jaccard_index(query_kmers, subject_kmers)
scores.append((subject_id, score))
# Sort the scores in descending order and select the top 5 hits
scores.sort(key=lambda x: x[1], reverse=True)
top_hits = scores[:5]
for subject_id, score in top_hits:
results.append((query_headers[query_id], database_headers[subject_id], score))
# Write the results to the results file
with open("search_results.txt", "w") as f:
f.write("QUERY_ID\tSUBJECT_ID\tSIMILARITY_SCORE\n")
for result in results:
f.write(f"{result[0]}\t{result[1]}\t{result[2]:.0f}\n")
# Calculate the Jaccard index between two sets
def jaccard_index(set1, set2):
intersection = len(set1 & set2)
union = len(set1 | set2)
return (intersection / union) * 100 if union != 0 else 0
# Read FASTA headers from the input file
def read_fasta_headers(file_path):
headers = []
with open(file_path, "r") as handle:
for record in SeqIO.parse(handle, "fasta"):
headers.append(record.id)
return headers
if __name__ == "__main__":
main()